Skip to content

Conversation

@saqadri
Copy link
Collaborator

@saqadri saqadri commented Aug 29, 2025

Add @app.tool and @app.async_tool decorators, which auto-generate workflows and tools for those functions.

Summary

This PR introduces function-as-workflow decorators to simplify building and deploying workflows/tools, unifies naming and schema behavior, and tightens server registration and status behavior. It adds tests validating tool registration, status, and schema generation.

Key changes

  • New decorators

    • @app.tool(name=...):
      • Wraps a plain function into an auto Workflow, whose run function calls the decorated function.
      • Registers 2 tools: <fn_name> (sync wrapper), <fn_name>-get_status.
      • Waits for completion and returns the original function’s return (unwraps WorkflowResult).
    • @app.async_tool(name=...):
      • Wraps a plain function into an auto Workflow.
      • Registers tools: <fn_name>-async-run, <fn_name>-async-get_status.
      • Does not return the result; returns workflow_id/run_id.
  • Auto workflow generation

    • AutoWorkflow class created dynamically with run method calling the underlying function (async or sync).
    • Non-WorkflowResult results are wrapped in WorkflowResult(value=...).

Updated examples to follow

Summary by CodeRabbit

  • New Features

    • Declare synchronous and asynchronous tools with new decorators; tools register automatically when the server is available.
    • Convert plain functions into workflows with generated run/status tools and accurate parameter schemas (excludes self), supporting both sync and async behavior.
  • Refactor

    • More robust workflow registry resolution and parameter sourcing for tooling; avoids duplicate registrations.
  • Tests

    • Added coverage for tool decorators, async/sync execution paths, result wrapping, and workflow run parameter schemas.
  • Style

    • Formatting-only updates to CLI and multiple test files; no behavioral changes.

@coderabbitai
Copy link

coderabbitai bot commented Aug 29, 2025

Walkthrough

Adds decorators to declare sync/async MCP tools with deferred registration and dynamic Workflow generation. Extends server to resolve workflow registries, derive parameter sources, build run-parameter tools, and register declared-function tools (idempotently). Updates CLI import formatting. Adds tests for workflow schema/tool decorators; other test edits are formatting-only. Duplicate helper exists in app.py.

Changes

Cohort / File(s) Summary of Changes
App: Deferred tool decorators & dynamic workflows
src/mcp_agent/app.py
Introduces deferred tool declarations (self._declared_tools), tool and async_tool decorators, and dynamic Workflow generation from functions (sync/async handling, parameter-source hook). Notes: duplicate _create_workflow_from_function defined twice.
Server: Tooling & registry integration
src/mcp_agent/server/app_server.py
Enhances registry resolution; adds parameter-source selection and run-parameter tool builder; implements registration of function-declared tools with sync/async paths and idempotency tracking; skips generic tooling for marked function tools; integrates into app lifespan and attached FastMCP.
CLI: Import formatting
src/mcp_agent/cli/cloud/main.py
Reformats import into multiline; no functional changes.
Tests: New server behavior
tests/server/test_app_server_workflow_schema.py, tests/server/test_tool_decorators.py
Adds tests validating per-workflow run tool schema (excluding self) and declared-tool registration/execution for sync/async, including polling-based status checks and WorkflowResult wrapping.
Tests: Formatting-only changes (CLI)
tests/cli/commands/*
Whitespace, import reflows, trailing commas, string quote style, and with-statement formatting across multiple CLI test modules; no logic changes.
Tests: Utils minor style
tests/cli/utils/jwt_generator.py
Inserts a blank line; no behavior change.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Dev as Developer
  participant App as MCPApp
  participant Decl as Deferred Store (_declared_tools)
  participant Server as App Server
  participant MCP as FastMCP

  Dev->>App: @app.tool / @app.async_tool(fn)
  App->>App: _create_workflow_from_function(fn)
  App->>App: self.workflow(auto_cls, id)
  App->>Decl: Append {name, mode, workflow, source_fn, ...}

  Server->>App: On startup / attach
  Server->>Server: _resolve_workflow_registry(...)
  Server->>Decl: Read declared tools
  Server->>Server: _get_param_source_function_from_workflow(...)
  Server->>Server: _build_run_param_tool(...)
  Server->>MCP: Register sync/async tool(s) + run/status tools
  Note over Server,MCP: Track registered to avoid duplicates
Loading
sequenceDiagram
  autonumber
  actor Client
  participant MCP as FastMCP
  participant Server as App Server
  participant WF as Workflow Registry/Engine

  Client->>MCP: Call tool "my_tool" (sync) OR "my_tool-run" (async)
  MCP->>Server: Invoke corresponding handler
  alt Sync tool
    Server->>WF: Execute workflow.run(params)
    WF-->>Server: Result (immediate)
    Server-->>MCP: Plain return (unwrapped)
  else Async tool
    Server->>WF: Start run(params)
    WF-->>Server: run_id
    Server-->>MCP: run_id
    loop Poll status
      Client->>MCP: _workflow_status(run_id)
      MCP->>Server: Status request
      Server->>WF: Get status(run_id)
      WF-->>Server: {status, result?}
      Server-->>MCP: Status payload
    end
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • rholinshead
  • petersonbill64

Poem

I nibbled code like clover leaves,
Wrapped plain funcs in Workflow sleeves.
Tools deferred, then hop to play,
Sync or async, night or day.
Schemas trimmed—no “self” to see—
Carrots queued in registry.
Thump! Another tidy spree. 🥕🐇

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/tool_decorator

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbit in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbit in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbit gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbit read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbit help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbit ignore or @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbit summary or @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbit or @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
src/mcp_agent/server/app_server.py (3)

267-281: list_workflows leaks ‘self’ in param schema and exposes endpoints that might not exist

  • Use _build_run_param_tool to drop ‘self’ for class run().
  • Align “tool_endpoints” with the skip logic in create_workflow_tools:
    • sync auto tool: [, -get_status]
    • async auto tool: [-async-run, -async-get_status]
    • generic workflow: [workflows--run, workflows--get_status]

Apply:

-            # Get workflow documentation
-            run_fn_tool = FastTool.from_function(workflow_cls.run)
+            # Get workflow documentation (drop 'self' when needed)
+            run_fn_tool = _build_run_param_tool(workflow_cls)
@@
-            endpoints = [
-                f"workflows-{workflow_name}-run",
-                f"workflows-{workflow_name}-get_status",
-            ]
+            if getattr(workflow_cls, "__mcp_agent_sync_tool__", False):
+                endpoints = [workflow_name, f"{workflow_name}-get_status"]
+            elif getattr(workflow_cls, "__mcp_agent_async_tool__", False):
+                endpoints = [f"{workflow_name}-async-run", f"{workflow_name}-async-get_status"]
+            else:
+                endpoints = [
+                    f"workflows-{workflow_name}-run",
+                    f"workflows-{workflow_name}-get_status",
+                ]

376-382: resume_workflow fails without managed lifespan (no fallback to attached context)

Unlike runs-list, this assumes ctx.request_context.lifespan_context. Provide the same fallback to _get_attached_server_context.

Apply:

-        server_context: ServerContext = ctx.request_context.lifespan_context
-        workflow_registry = server_context.workflow_registry
+        server_context = getattr(ctx.request_context, "lifespan_context", None) or _get_attached_server_context(ctx.fastmcp)
+        if server_context is None or not hasattr(server_context, "workflow_registry"):
+            raise ToolError("Server context not available for MCPApp Server.")
+        workflow_registry = server_context.workflow_registry

418-426: cancel_workflow has the same managed-lifespan-only assumption

Mirror the fallback to support externally attached FastMCP.

Apply:

-        server_context: ServerContext = ctx.request_context.lifespan_context
-        workflow_registry = server_context.workflow_registry
+        server_context = getattr(ctx.request_context, "lifespan_context", None) or _get_attached_server_context(ctx.fastmcp)
+        if server_context is None or not hasattr(server_context, "workflow_registry"):
+            raise ToolError("Server context not available for MCPApp Server.")
+        workflow_registry = server_context.workflow_registry
🧹 Nitpick comments (16)
tests/cli/commands/test_cli_secrets.py (1)

464-466: Deduplicate and reuse the same output normalization.

Keep test behavior consistent by calling the same helper here.

Apply this diff:

-        clean_text = " ".join(
-            re.sub(r"[^\x00-\x7F]+", " ", combined_output).split()
-        ).lower()
+        clean_text = normalize_output(combined_output)
src/mcp_agent/app.py (3)

118-129: Comment/data mismatch in declared tool schema.

The comment lists tool_wrapper but the stored dicts don’t include it. Align the comment or add the field.

Apply this diff to fix the comment:

-        #   "tool_wrapper": Callable | None,

602-626: Guard against duplicate sync tool declarations.

Repeated decoration with the same name appends duplicates to _declared_tools. Add a simple dedupe guard.

Apply this diff:

         def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
             tool_name = name or fn.__name__
+            # Idempotency: avoid duplicate declarations
+            if any(d.get("name") == tool_name and d.get("mode") == "sync" for d in self._declared_tools):
+                return fn
             # Construct the workflow from function

642-663: Guard against duplicate async tool declarations.

Mirror the sync dedupe to avoid multiple entries for the same async tool.

Apply this diff:

         def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
             workflow_name = name or fn.__name__
+            if any(d.get("name") == workflow_name and d.get("mode") == "async" for d in self._declared_tools):
+                return fn
             workflow_cls = self._create_workflow_from_function(
tests/cli/commands/test_app_workflows.py (4)

1-2: Fix misleading module docstring

This file tests the “workflows” command, not “configure”.

-"""Tests for the configure command."""
+"""Tests for the app workflows command."""

61-74: Use timezone-aware datetimes in test data

Stay consistent with UTC-aware datetimes used elsewhere to avoid tz-related assertions later.

-    app = MCPApp(
+    app = MCPApp(
         appId=MOCK_APP_ID,
         name="name",
         creatorId="creatorId",
-        createdAt=datetime.datetime.now(),
-        updatedAt=datetime.datetime.now(),
+        createdAt=datetime.datetime.now(datetime.timezone.utc),
+        updatedAt=datetime.datetime.now(datetime.timezone.utc),
         appServerInfo=app_server_info,
     )

95-106: Same timezone-awareness nit

Mirror the UTC-aware datetime change for AppConfiguration creation to keep consistency.

-    app_config = MCPAppConfiguration(
+    app_config = MCPAppConfiguration(
         appConfigurationId=MOCK_APP_CONFIG_ID,
         creatorId="creator",
         appServerInfo=app_server_info,
     )

(If createdAt/updatedAt fields get added here in future, prefer timezone-aware values.)

Also applies to: 110-113, 121-124


61-61: Rename test functions for clarity

Names say “status” but this file tests “workflows”.

-def test_status_app(patched_workflows_app, mock_mcp_client):
+def test_workflows_app(patched_workflows_app, mock_mcp_client):
@@
-def test_status_app_config(patched_workflows_app, mock_mcp_client):
+def test_workflows_app_config(patched_workflows_app, mock_mcp_client):

Also applies to: 95-95

tests/cli/commands/test_app_status.py (3)

1-2: Fix misleading module docstring

This file tests the “status” command.

-"""Tests for the configure command."""
+"""Tests for the app status command."""

61-74: Use timezone-aware datetimes in test data

Align with UTC-aware datetimes used by mocks.

-    app = MCPApp(
+    app = MCPApp(
         appId=MOCK_APP_ID,
         name="name",
         creatorId="creatorId",
-        createdAt=datetime.datetime.now(),
-        updatedAt=datetime.datetime.now(),
+        createdAt=datetime.datetime.now(datetime.timezone.utc),
+        updatedAt=datetime.datetime.now(datetime.timezone.utc),
         appServerInfo=app_server_info,
     )

95-106: Minor: Ensure tz-aware datetimes if added

If AppConfiguration objects later include timestamps, prefer timezone-aware values for consistency.

Also applies to: 110-113, 121-124

tests/server/test_app_server_workflow_schema.py (1)

24-58: Assert schema by parsing JSON instead of substring search

Parsing the embedded JSON makes the test less brittle to formatting.

-    desc = kwargs.get("description", "")
-    # The description embeds the JSON schema; assert basic fields are referenced
-    assert "q" in desc
-    assert "flag" in desc
-    assert "self" not in desc
+    desc = kwargs.get("description", "")
+    # Extract the first JSON object from the description and parse it
+    import json, re
+    m = re.search(r'\{.*\}', desc, flags=re.S)
+    assert m, "No JSON schema found in description"
+    schema = json.loads(m.group(0))
+    props = schema.get("properties", {})
+    assert "q" in props
+    assert "flag" in props
+    assert "self" not in props
tests/server/test_tool_decorators.py (2)

42-57: Attach app to server_context to avoid fallback heuristics

Make tests more direct by including app on the context used by server helpers.

-    ctx = SimpleNamespace()
+    ctx = SimpleNamespace()
@@
-    req = SimpleNamespace(lifespan_context=server_context)
+    req = SimpleNamespace(lifespan_context=server_context)
     ctx.request_context = req
     ctx.fastmcp = SimpleNamespace(_mcp_agent_app=None)
     return ctx

Additionally, when constructing server_context:

-    server_context = type(
-        "SC", (), {"workflows": app.workflows, "context": app.context}
-    )()
+    server_context = type(
+        "SC", (), {"workflows": app.workflows, "context": app.context, "app": app}
+    )()

95-111: Factor polling into a tiny helper to reduce flakiness/noise

DRY the completion wait and centralize timeout.

@@
-    for _ in range(200):
-        status = await _workflow_status(ctx, run_id, "echo")
-        if status.get("completed"):
-            break
-        await asyncio.sleep(0.01)
+    async def _wait_done(name, rid, timeout_s=2.0):
+        for _ in range(int(timeout_s / 0.01)):
+            s = await _workflow_status(ctx, rid, name)
+            if s.get("completed"):
+                return s
+            await asyncio.sleep(0.01)
+        return s
+    status = await _wait_done("echo", run_id)
@@
-    for _ in range(100):
-        status = await _workflow_status(ctx, run_id, "wrapme")
-        if status.get("completed"):
-            break
-        await asyncio.sleep(0.01)
+    status = await _wait_done("wrapme", run_id)

Also applies to: 158-174

src/mcp_agent/server/app_server.py (2)

151-160: Param-source resolution for schema is correct

Good fallback to workflow.run when a custom param-source isn’t present. Consider annotating the return type as Callable[..., Any] for clarity.


536-546: Guard against missing/unknown declarations

If a declaration is missing source_fn or has an unexpected mode, we silently continue. Consider logging at debug/warn to aid diagnosis.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between f3a1331 and bccedcd.

📒 Files selected for processing (12)
  • src/mcp_agent/app.py (2 hunks)
  • src/mcp_agent/cli/cloud/main.py (1 hunks)
  • src/mcp_agent/server/app_server.py (5 hunks)
  • tests/cli/commands/test_app_delete.py (5 hunks)
  • tests/cli/commands/test_app_status.py (7 hunks)
  • tests/cli/commands/test_app_workflows.py (7 hunks)
  • tests/cli/commands/test_cli_secrets.py (2 hunks)
  • tests/cli/commands/test_configure.py (1 hunks)
  • tests/cli/commands/test_deploy_command.py (4 hunks)
  • tests/cli/utils/jwt_generator.py (1 hunks)
  • tests/server/test_app_server_workflow_schema.py (1 hunks)
  • tests/server/test_tool_decorators.py (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-28T15:07:09.951Z
Learnt from: saqadri
PR: lastmile-ai/mcp-agent#386
File: src/mcp_agent/mcp/mcp_server_registry.py:110-116
Timestamp: 2025-08-28T15:07:09.951Z
Learning: In MCP server registry methods, when client_session_factory parameters are updated to accept additional context parameters, ensure the type hints match what is actually passed (Context instance vs ServerSession) and that the default factory (MCPAgentClientSession) can handle the number of arguments being passed to avoid TypeError at runtime.

Applied to files:

  • src/mcp_agent/server/app_server.py
🧬 Code graph analysis (8)
tests/cli/commands/test_configure.py (1)
src/mcp_agent/cli/mcp_app/mock_client.py (1)
  • MockMCPAppClient (21-196)
tests/server/test_app_server_workflow_schema.py (3)
src/mcp_agent/server/app_server.py (4)
  • app (78-80)
  • create_workflow_tools (441-462)
  • run (704-708)
  • workflows (83-85)
src/mcp_agent/app.py (9)
  • MCPApp (40-796)
  • executor (161-162)
  • workflow (399-432)
  • tool (587-627)
  • initialize (195-292)
  • workflow_run (484-524)
  • run (374-397)
  • workflows (177-178)
  • context (145-150)
src/mcp_agent/executor/workflow.py (1)
  • WorkflowResult (55-59)
tests/cli/commands/test_app_status.py (5)
src/mcp_agent/cli/mcp_app/mock_client.py (1)
  • MockMCPAppClient (21-196)
tests/cli/commands/test_app_delete.py (1)
  • mock_mcp_client (18-30)
tests/cli/commands/test_app_workflows.py (1)
  • mock_mcp_client (20-29)
tests/cli/commands/test_configure.py (1)
  • mock_mcp_client (16-27)
src/mcp_agent/cli/mcp_app/api_client.py (1)
  • get_app_or_config (224-260)
src/mcp_agent/server/app_server.py (4)
src/mcp_agent/core/context.py (2)
  • mcp (98-99)
  • Context (57-99)
src/mcp_agent/executor/workflow.py (4)
  • run_id (129-134)
  • get_status (574-615)
  • executor (114-119)
  • WorkflowResult (55-59)
src/mcp_agent/executor/temporal/workflow_registry.py (1)
  • get_workflow (63-66)
src/mcp_agent/app.py (3)
  • executor (161-162)
  • workflow (399-432)
  • tool (587-627)
tests/cli/commands/test_app_delete.py (3)
src/mcp_agent/cli/mcp_app/mock_client.py (1)
  • MockMCPAppClient (21-196)
src/mcp_agent/cli/cloud/commands/app/delete/main.py (1)
  • delete_app (22-131)
src/mcp_agent/cli/mcp_app/api_client.py (3)
  • delete_app (444-475)
  • MCPAppConfiguration (33-38)
  • delete_app_configuration (477-510)
tests/server/test_tool_decorators.py (3)
src/mcp_agent/server/app_server.py (7)
  • app (78-80)
  • create_workflow_tools (441-462)
  • create_declared_function_tools (473-656)
  • _workflow_run (768-816)
  • _workflow_status (819-834)
  • workflow_registry (88-90)
  • workflows (83-85)
src/mcp_agent/app.py (7)
  • MCPApp (40-796)
  • tool (587-627)
  • executor (161-162)
  • initialize (195-292)
  • workflows (177-178)
  • context (145-150)
  • async_tool (629-664)
src/mcp_agent/executor/workflow_registry.py (1)
  • InMemoryWorkflowRegistry (150-264)
tests/cli/commands/test_app_workflows.py (3)
src/mcp_agent/cli/mcp_app/mock_client.py (1)
  • MockMCPAppClient (21-196)
tests/cli/commands/test_app_status.py (1)
  • mock_mcp_client (20-29)
src/mcp_agent/cli/mcp_app/api_client.py (1)
  • get_app_or_config (224-260)
src/mcp_agent/app.py (1)
src/mcp_agent/executor/workflow.py (3)
  • executor (114-119)
  • Workflow (72-752)
  • WorkflowResult (55-59)
🔇 Additional comments (21)
tests/cli/commands/test_configure.py (1)

10-12: LGTM on import reformat.

tests/cli/utils/jwt_generator.py (1)

209-211: LGTM. Extra newline is harmless.

tests/cli/commands/test_deploy_command.py (3)

134-137: LGTM on patch context refactor.


167-170: LGTM on patch context refactor.


299-302: LGTM on patch context refactor.

tests/cli/commands/test_app_delete.py (2)

77-79: LGTM. Exercise both dry-run and actual delete paths.


82-84: LGTM. Minimal MCPAppConfiguration instantiation is fine.

src/mcp_agent/cli/cloud/main.py (1)

17-21: Import reformat only — OK

Style-only change; no functional impact.

tests/cli/commands/test_app_workflows.py (2)

33-59: Wrapper/patch pattern looks correct

The Typer-exit shimming and client injection are fine for isolating CLI behavior.


79-82: Expectation assertions — OK

Correctly validates the delegated call with server_url and api_key.

Also applies to: 90-92

tests/cli/commands/test_app_status.py (1)

79-82: Expectation assertions — OK

Correctly validates server details printing.

Also applies to: 90-92

tests/server/test_tool_decorators.py (3)

13-27: Tool recorder stub — OK

Captures decorated tools reliably for assertions.


64-67: Using @app.tool with an async function — confirm semantics or add a sync-case test

@app.tool is documented as “sync tool” (returns final value). It’s fine to use an async source fn, but please confirm this is intended and supported long-term, or add a second test with a plain def to cover the canonical path.

Would you like me to add a small test exercising a synchronous function decorated with @app.tool to lock in the unwrapping behavior?


126-134: Async aliases assertions — OK

Correctly validates presence of -async-run and -get_status tools; suppression of generic workflows-* for async auto tools is implied elsewhere.

src/mcp_agent/server/app_server.py (7)

135-147: Registry resolution precedence looks good

Preferring the inner app-context registry avoids stale/mocked registries. No issues.


212-214: Lifespan integration for declared tools: LGTM

Registering declared function tools during app lifespan ensures availability at startup.


237-239: Attachment-path integration for declared tools: LGTM

Mirrors lifespan behavior when FastMCP is externally provided.


453-457: Skipping generic workflow endpoints for auto tools — verify client/test expectations

Since auto tools won’t get workflows--* endpoints, ensure tests and UIs don’t rely on those for sync/async auto tools. If they do, list_workflows must also reflect this (see comment on Lines 267-281).


465-471: Registered function-tools tracking: LGTM

Set-based de-dup prevents double registration across contexts.


656-657: Persist registered set back on mcp: LGTM

Idempotency preserved across attaches.


689-721: Minor: keep run_parameters schema generation in sync with helper

No functional change needed after adopting _build_run_param_tool above. Just re-run tests to ensure formatting remains the same.

Comment on lines +526 to +586
def _create_workflow_from_function(
self,
fn: Callable[..., Any],
*,
workflow_name: str,
description: str | None = None,
mark_sync_tool: bool = False,
) -> Type:
"""
Create a Workflow subclass dynamically from a plain function.
The generated workflow class will:
- Have `run` implemented to call the provided function
- Be decorated with engine-specific run decorators via workflow_run
- Expose the original function for parameter schema generation
"""

import asyncio as _asyncio
from mcp_agent.executor.workflow import Workflow as _Workflow

async def _invoke_target(*args, **kwargs):
# Support both async and sync callables
res = fn(*args, **kwargs)
if _asyncio.iscoroutine(res):
res = await res

# Ensure WorkflowResult return type
try:
from mcp_agent.executor.workflow import (
WorkflowResult as _WorkflowResult,
)
except Exception:
_WorkflowResult = None # type: ignore[assignment]

if _WorkflowResult is not None and not isinstance(res, _WorkflowResult):
return _WorkflowResult(value=res)
return res

async def _run(self, *args, **kwargs): # type: ignore[no-redef]
return await _invoke_target(*args, **kwargs)

# Decorate run with engine-specific decorator
decorated_run = self.workflow_run(_run)

# Build the Workflow subclass dynamically
cls_dict: Dict[str, Any] = {
"__doc__": description or (fn.__doc__ or ""),
"run": decorated_run,
"__mcp_agent_param_source_fn__": fn,
}
if mark_sync_tool:
cls_dict["__mcp_agent_sync_tool__"] = True
else:
cls_dict["__mcp_agent_async_tool__"] = True

auto_cls = type(f"AutoWorkflow_{workflow_name}", (_Workflow,), cls_dict)

# Register with app (and apply engine-specific workflow decorator)
self.workflow(auto_cls, workflow_id=workflow_name)
return auto_cls

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Bug: return undecorated class and invalid auto class names when tool name has hyphens.

  • type() requires an identifier; names like "my-tool" will fail.
  • _create_workflow_from_function returns auto_cls before engine-specific decoration; downstream may use the wrong class.

Apply this diff:

         import asyncio as _asyncio
+        import re as _re
         from mcp_agent.executor.workflow import Workflow as _Workflow
@@
-        auto_cls = type(f"AutoWorkflow_{workflow_name}", (_Workflow,), cls_dict)
+        _safe_suffix = _re.sub(r"\W|^(?=\d)", "_", workflow_name)
+        auto_cls = type(f"AutoWorkflow_{_safe_suffix}", (_Workflow,), cls_dict)
 
-        # Register with app (and apply engine-specific workflow decorator)
-        self.workflow(auto_cls, workflow_id=workflow_name)
-        return auto_cls
+        # Register with app (and apply engine-specific workflow decorator)
+        decorated_cls = self.workflow(auto_cls, workflow_id=workflow_name)
+        return decorated_cls
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _create_workflow_from_function(
self,
fn: Callable[..., Any],
*,
workflow_name: str,
description: str | None = None,
mark_sync_tool: bool = False,
) -> Type:
"""
Create a Workflow subclass dynamically from a plain function.
The generated workflow class will:
- Have `run` implemented to call the provided function
- Be decorated with engine-specific run decorators via workflow_run
- Expose the original function for parameter schema generation
"""
import asyncio as _asyncio
from mcp_agent.executor.workflow import Workflow as _Workflow
async def _invoke_target(*args, **kwargs):
# Support both async and sync callables
res = fn(*args, **kwargs)
if _asyncio.iscoroutine(res):
res = await res
# Ensure WorkflowResult return type
try:
from mcp_agent.executor.workflow import (
WorkflowResult as _WorkflowResult,
)
except Exception:
_WorkflowResult = None # type: ignore[assignment]
if _WorkflowResult is not None and not isinstance(res, _WorkflowResult):
return _WorkflowResult(value=res)
return res
async def _run(self, *args, **kwargs): # type: ignore[no-redef]
return await _invoke_target(*args, **kwargs)
# Decorate run with engine-specific decorator
decorated_run = self.workflow_run(_run)
# Build the Workflow subclass dynamically
cls_dict: Dict[str, Any] = {
"__doc__": description or (fn.__doc__ or ""),
"run": decorated_run,
"__mcp_agent_param_source_fn__": fn,
}
if mark_sync_tool:
cls_dict["__mcp_agent_sync_tool__"] = True
else:
cls_dict["__mcp_agent_async_tool__"] = True
auto_cls = type(f"AutoWorkflow_{workflow_name}", (_Workflow,), cls_dict)
# Register with app (and apply engine-specific workflow decorator)
self.workflow(auto_cls, workflow_id=workflow_name)
return auto_cls
import asyncio as _asyncio
import re as _re
from mcp_agent.executor.workflow import Workflow as _Workflow
async def _invoke_target(*args, **kwargs):
# Support both async and sync callables
res = fn(*args, **kwargs)
if _asyncio.iscoroutine(res):
res = await res
# Ensure WorkflowResult return type
try:
from mcp_agent.executor.workflow import (
WorkflowResult as _WorkflowResult,
)
except Exception:
_WorkflowResult = None # type: ignore[assignment]
if _WorkflowResult is not None and not isinstance(res, _WorkflowResult):
return _WorkflowResult(value=res)
return res
async def _run(self, *args, **kwargs): # type: ignore[no-redef]
return await _invoke_target(*args, **kwargs)
# Decorate run with engine-specific decorator
decorated_run = self.workflow_run(_run)
# Build the Workflow subclass dynamically
cls_dict: Dict[str, Any] = {
"__doc__": description or (fn.__doc__ or ""),
"run": decorated_run,
"__mcp_agent_param_source_fn__": fn,
}
if mark_sync_tool:
cls_dict["__mcp_agent_sync_tool__"] = True
else:
cls_dict["__mcp_agent_async_tool__"] = True
# Sanitize workflow_name into a valid Python identifier suffix
_safe_suffix = _re.sub(r"\W|^(?=\d)", "_", workflow_name)
auto_cls = type(f"AutoWorkflow_{_safe_suffix}", (_Workflow,), cls_dict)
# Register with app (and apply engine-specific workflow decorator)
decorated_cls = self.workflow(auto_cls, workflow_id=workflow_name)
return decorated_cls

Comment on lines +162 to +186
def _build_run_param_tool(workflow_cls: Type["Workflow"]) -> FastTool:
"""Return a FastTool built from the proper parameter source, skipping 'self'."""
param_source = _get_param_source_function_from_workflow(workflow_cls)
import inspect as _inspect

if param_source is getattr(workflow_cls, "run"):

def _schema_fn_proxy(*args, **kwargs):
return None

sig = _inspect.signature(param_source)
params = list(sig.parameters.values())
if params and params[0].name == "self":
params = params[1:]
_schema_fn_proxy.__annotations__ = dict(
getattr(param_source, "__annotations__", {})
)
if "self" in _schema_fn_proxy.__annotations__:
_schema_fn_proxy.__annotations__.pop("self", None)
_schema_fn_proxy.__signature__ = _inspect.Signature(
parameters=params, return_annotation=sig.return_annotation
)
return FastTool.from_function(_schema_fn_proxy)
return FastTool.from_function(param_source)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

_build_run_param_tool is unused; wire it in to remove duplication and prevent schema drift

You reimplemented the “drop self from run()” logic elsewhere. Use this helper in list_workflows and create_workflow_specific_tools to keep behavior consistent and avoid future divergence.

🤖 Prompt for AI Agents
In src/mcp_agent/server/app_server.py around lines 162 to 186, the helper
_build_run_param_tool implements the "drop self from run()" schema logic but is
not used elsewhere; locate the duplicated logic in list_workflows and
create_workflow_specific_tools and replace those code blocks with calls to
_build_run_param_tool(workflow_cls) so both places produce the same FastTool;
ensure any existing local adjustments (annotations/signature tweaks) are removed
and that the helper is imported/visible in the module scope so no additional
duplication remains.

Comment on lines +503 to +535
ctx: MCPContext, run_id: str, timeout: float | None = None
):
registry = _resolve_workflow_registry(ctx)
if not registry:
raise ToolError("Workflow registry not found for MCPApp Server.")
# Try to get the workflow and wait on its task if available
start = asyncio.get_event_loop().time()
# Ensure the workflow is registered locally to retrieve the task
try:
wf = await registry.get_workflow(run_id)
if wf is None and hasattr(registry, "register"):
# Best-effort: some registries need explicit register; try to find by status
# and skip if unavailable. This is a no-op for InMemory which registers at run_async.
pass
except Exception:
pass
while True:
wf = await registry.get_workflow(run_id)
if wf is not None:
task = getattr(wf, "_run_task", None)
if isinstance(task, asyncio.Task):
return await asyncio.wait_for(task, timeout=timeout)
# Fallback to polling the status
status = await wf.get_status()
if status.get("completed"):
return status.get("result")
if (
timeout is not None
and (asyncio.get_event_loop().time() - start) > timeout
):
raise ToolError("Timed out waiting for workflow completion")
await asyncio.sleep(0.1)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Synchronous tools may return serialized WorkflowResult instead of original return value (temporal/remote registries)

When the workflow isn’t local, _wait_for_completion falls back to get_status() and returns status["result"], which is a serialized dict/string. This violates the contract “@app.tool returns the original function’s return value (unwrapped)”.

Apply this diff to robustly unwrap serialized WorkflowResult and use monotonic time:

-        start = asyncio.get_event_loop().time()
+        loop = asyncio.get_running_loop()
+        start = loop.time()
@@
-        while True:
+        while True:
             wf = await registry.get_workflow(run_id)
             if wf is not None:
                 task = getattr(wf, "_run_task", None)
                 if isinstance(task, asyncio.Task):
                     return await asyncio.wait_for(task, timeout=timeout)
                 # Fallback to polling the status
                 status = await wf.get_status()
                 if status.get("completed"):
-                    return status.get("result")
+                    result = status.get("result")
+                    # Best-effort unwrap of serialized WorkflowResult
+                    if isinstance(result, dict) and "value" in result and (
+                        {"metadata", "start_time", "end_time"} & set(result.keys())
+                    ):
+                        return result.get("value")
+                    return result
             if (
                 timeout is not None
-                and (asyncio.get_event_loop().time() - start) > timeout
+                and (loop.time() - start) > timeout
             ):
                 raise ToolError("Timed out waiting for workflow completion")
             await asyncio.sleep(0.1)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ctx: MCPContext, run_id: str, timeout: float | None = None
):
registry = _resolve_workflow_registry(ctx)
if not registry:
raise ToolError("Workflow registry not found for MCPApp Server.")
# Try to get the workflow and wait on its task if available
start = asyncio.get_event_loop().time()
# Ensure the workflow is registered locally to retrieve the task
try:
wf = await registry.get_workflow(run_id)
if wf is None and hasattr(registry, "register"):
# Best-effort: some registries need explicit register; try to find by status
# and skip if unavailable. This is a no-op for InMemory which registers at run_async.
pass
except Exception:
pass
while True:
wf = await registry.get_workflow(run_id)
if wf is not None:
task = getattr(wf, "_run_task", None)
if isinstance(task, asyncio.Task):
return await asyncio.wait_for(task, timeout=timeout)
# Fallback to polling the status
status = await wf.get_status()
if status.get("completed"):
return status.get("result")
if (
timeout is not None
and (asyncio.get_event_loop().time() - start) > timeout
):
raise ToolError("Timed out waiting for workflow completion")
await asyncio.sleep(0.1)
ctx: MCPContext, run_id: str, timeout: float | None = None
):
registry = _resolve_workflow_registry(ctx)
if not registry:
raise ToolError("Workflow registry not found for MCPApp Server.")
# Try to get the workflow and wait on its task if available
loop = asyncio.get_running_loop()
start = loop.time()
# Ensure the workflow is registered locally to retrieve the task
try:
wf = await registry.get_workflow(run_id)
if wf is None and hasattr(registry, "register"):
# Best-effort: some registries need explicit register; try to find by status
# and skip if unavailable. This is a no-op for InMemory which registers at run_async.
pass
except Exception:
pass
while True:
wf = await registry.get_workflow(run_id)
if wf is not None:
task = getattr(wf, "_run_task", None)
if isinstance(task, asyncio.Task):
return await asyncio.wait_for(task, timeout=timeout)
# Fallback to polling the status
status = await wf.get_status()
if status.get("completed"):
result = status.get("result")
# Best-effort unwrap of serialized WorkflowResult
if isinstance(result, dict) and "value" in result and (
{"metadata", "start_time", "end_time"} & set(result.keys())
):
return result.get("value")
return result
if (
timeout is not None
and (loop.time() - start) > timeout
):
raise ToolError("Timed out waiting for workflow completion")
await asyncio.sleep(0.1)

Comment on lines +550 to +616
async def _wrapper(**kwargs):
# Context will be injected by FastMCP using the special annotation below
ctx: MCPContext = kwargs.pop(
"__context__"
) # placeholder, reassigned below via signature name
# Start workflow and wait for completion
result_ids = await _workflow_run(ctx, workflow_name, kwargs)
run_id = result_ids["run_id"]
result = await _wait_for_completion(ctx, run_id)
# Unwrap WorkflowResult to match the original function's return type
try:
from mcp_agent.executor.workflow import WorkflowResult as _WFRes
except Exception:
_WFRes = None # type: ignore
if _WFRes is not None and isinstance(result, _WFRes):
return getattr(result, "value", None)
# If get_status returned dict/str, pass through; otherwise return model
return result

# Attach introspection metadata to match the original function
ann = dict(getattr(fn, "__annotations__", {}))

# Choose a context kwarg name unlikely to clash with user params
ctx_param_name = "ctx"
from mcp.server.fastmcp import Context as _Ctx

ann[ctx_param_name] = _Ctx
ann["return"] = getattr(fn, "__annotations__", {}).get("return", return_ann)
_wrapper.__annotations__ = ann
_wrapper.__name__ = name
_wrapper.__doc__ = description or (fn.__doc__ or "")

# Build a fake signature containing original params plus context kwarg
params = list(sig.parameters.values())
ctx_param = inspect.Parameter(
ctx_param_name,
kind=inspect.Parameter.KEYWORD_ONLY,
annotation=_Ctx,
)
_wrapper.__signature__ = inspect.Signature(
parameters=params + [ctx_param], return_annotation=return_ann
)

# FastMCP expects the actual kwarg name for context; it detects it by annotation
# We need to map the injected kwarg inside the wrapper body. Achieve this by
# creating a thin adapter that renames the injected context kwarg.
async def _adapter(**kw):
# Receive validated args plus injected context kwarg
if ctx_param_name not in kw:
raise ToolError("Context not provided")
# Rename to the placeholder expected by _wrapper
kw["__context__"] = kw.pop(ctx_param_name)
return await _wrapper(**kw)

# Copy the visible signature/annotations to adapter for correct schema
_adapter.__annotations__ = _wrapper.__annotations__
_adapter.__name__ = _wrapper.__name__
_adapter.__doc__ = _wrapper.__doc__
_adapter.__signature__ = _wrapper.__signature__

# Register the main tool with the same signature as original
mcp.add_tool(
_adapter,
name=name,
description=description or (fn.__doc__ or ""),
structured_output=structured_output,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Late-binding closure bugs: capture loop variables (workflow_name, name) per iteration

Nested defs capture the loop vars by reference; after the loop, all tools may target the last workflow. Freeze values per-iteration.

Apply this diff to the sync path to bind workflow_name and avoid schema pollution:

-            async def _wrapper(**kwargs):
+            async def _wrapper(*, __wf_name: str = workflow_name, **kwargs):
@@
-                result_ids = await _workflow_run(ctx, workflow_name, kwargs)
+                result_ids = await _workflow_run(ctx, __wf_name, kwargs)
@@
-            async def _adapter(**kw):
+            async def _adapter(**kw):
                 # Receive validated args plus injected context kwarg
                 if ctx_param_name not in kw:
                     raise ToolError("Context not provided")
                 # Rename to the placeholder expected by _wrapper
                 kw["__context__"] = kw.pop(ctx_param_name)
-                return await _wrapper(**kw)
+                return await _wrapper(**kw)

Important: hide the helper kw-only param from the tool schema (already handled because you overwrite signature to only include original params + ctx).

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def _wrapper(**kwargs):
# Context will be injected by FastMCP using the special annotation below
ctx: MCPContext = kwargs.pop(
"__context__"
) # placeholder, reassigned below via signature name
# Start workflow and wait for completion
result_ids = await _workflow_run(ctx, workflow_name, kwargs)
run_id = result_ids["run_id"]
result = await _wait_for_completion(ctx, run_id)
# Unwrap WorkflowResult to match the original function's return type
try:
from mcp_agent.executor.workflow import WorkflowResult as _WFRes
except Exception:
_WFRes = None # type: ignore
if _WFRes is not None and isinstance(result, _WFRes):
return getattr(result, "value", None)
# If get_status returned dict/str, pass through; otherwise return model
return result
# Attach introspection metadata to match the original function
ann = dict(getattr(fn, "__annotations__", {}))
# Choose a context kwarg name unlikely to clash with user params
ctx_param_name = "ctx"
from mcp.server.fastmcp import Context as _Ctx
ann[ctx_param_name] = _Ctx
ann["return"] = getattr(fn, "__annotations__", {}).get("return", return_ann)
_wrapper.__annotations__ = ann
_wrapper.__name__ = name
_wrapper.__doc__ = description or (fn.__doc__ or "")
# Build a fake signature containing original params plus context kwarg
params = list(sig.parameters.values())
ctx_param = inspect.Parameter(
ctx_param_name,
kind=inspect.Parameter.KEYWORD_ONLY,
annotation=_Ctx,
)
_wrapper.__signature__ = inspect.Signature(
parameters=params + [ctx_param], return_annotation=return_ann
)
# FastMCP expects the actual kwarg name for context; it detects it by annotation
# We need to map the injected kwarg inside the wrapper body. Achieve this by
# creating a thin adapter that renames the injected context kwarg.
async def _adapter(**kw):
# Receive validated args plus injected context kwarg
if ctx_param_name not in kw:
raise ToolError("Context not provided")
# Rename to the placeholder expected by _wrapper
kw["__context__"] = kw.pop(ctx_param_name)
return await _wrapper(**kw)
# Copy the visible signature/annotations to adapter for correct schema
_adapter.__annotations__ = _wrapper.__annotations__
_adapter.__name__ = _wrapper.__name__
_adapter.__doc__ = _wrapper.__doc__
_adapter.__signature__ = _wrapper.__signature__
# Register the main tool with the same signature as original
mcp.add_tool(
_adapter,
name=name,
description=description or (fn.__doc__ or ""),
structured_output=structured_output,
)
async def _wrapper(*, __wf_name: str = workflow_name, **kwargs):
# Context will be injected by FastMCP using the special annotation below
ctx: MCPContext = kwargs.pop(
"__context__"
) # placeholder, reassigned below via signature name
# Start workflow and wait for completion
result_ids = await _workflow_run(ctx, __wf_name, kwargs)
run_id = result_ids["run_id"]
result = await _wait_for_completion(ctx, run_id)
# Unwrap WorkflowResult to match the original function's return type
try:
from mcp_agent.executor.workflow import WorkflowResult as _WFRes
except Exception:
_WFRes = None # type: ignore
if _WFRes is not None and isinstance(result, _WFRes):
return getattr(result, "value", None)
# If get_status returned dict/str, pass through; otherwise return model
return result
# Attach introspection metadata to match the original function
ann = dict(getattr(fn, "__annotations__", {}))
# Choose a context kwarg name unlikely to clash with user params
ctx_param_name = "ctx"
from mcp.server.fastmcp import Context as _Ctx
ann[ctx_param_name] = _Ctx
ann["return"] = getattr(fn, "__annotations__", {}).get("return", return_ann)
_wrapper.__annotations__ = ann
_wrapper.__name__ = name
_wrapper.__doc__ = description or (fn.__doc__ or "")
# Build a fake signature containing original params plus context kwarg
params = list(sig.parameters.values())
ctx_param = inspect.Parameter(
ctx_param_name,
kind=inspect.Parameter.KEYWORD_ONLY,
annotation=_Ctx,
)
_wrapper.__signature__ = inspect.Signature(
parameters=params + [ctx_param], return_annotation=return_ann
)
# FastMCP expects the actual kwarg name for context; it detects it by annotation
# We need to map the injected kwarg inside the wrapper body. Achieve this by
# creating a thin adapter that renames the injected context kwarg.
async def _adapter(**kw):
# Receive validated args plus injected context kwarg
if ctx_param_name not in kw:
raise ToolError("Context not provided")
# Rename to the placeholder expected by _wrapper
kw["__context__"] = kw.pop(ctx_param_name)
return await _wrapper(**kw)
🤖 Prompt for AI Agents
In src/mcp_agent/server/app_server.py around lines 550 to 616, nested async
functions (_wrapper and _adapter) close over loop variables like workflow_name
and name causing late-binding bugs so all registered tools end up referencing
the last loop iteration; fix by binding those variables per-iteration (e.g.,
capture workflow_name and name as default parameters on _wrapper and _adapter or
use functools.partial) so each closure gets its own copy, and apply the same
per-iteration binding to the sync registration path as well; keep the helper
kw-only ctx param hidden from the tool schema as currently handled by
overwriting __signature__.

Comment on lines +619 to +629
# Also register a per-run status tool: <tool-name>-get_status
status_tool_name = f"{name}-get_status"
if status_tool_name not in registered:

@mcp.tool(name=status_tool_name)
async def _sync_status(ctx: MCPContext, run_id: str) -> Dict[str, Any]:
return await _workflow_status(
ctx, run_id=run_id, workflow_name=workflow_name
)

registered.add(status_tool_name)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Late-binding in status tool for sync path; also prefer add_tool over decorator for factories

Freeze workflow_name and avoid exposing extra params by using a factory and registering the returned function.

Apply:

-            status_tool_name = f"{name}-get_status"
+            status_tool_name = f"{name}-get_status"
             if status_tool_name not in registered:
-
-                @mcp.tool(name=status_tool_name)
-                async def _sync_status(ctx: MCPContext, run_id: str) -> Dict[str, Any]:
-                    return await _workflow_status(
-                        ctx, run_id=run_id, workflow_name=workflow_name
-                    )
-
-                registered.add(status_tool_name)
+                def _make_sync_status(_wf_name: str):
+                    async def _sync_status(ctx: MCPContext, run_id: str) -> Dict[str, Any]:
+                        return await _workflow_status(ctx, run_id=run_id, workflow_name=_wf_name)
+                    return _sync_status
+                mcp.add_tool(_make_sync_status(workflow_name), name=status_tool_name)
+                registered.add(status_tool_name)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Also register a per-run status tool: <tool-name>-get_status
status_tool_name = f"{name}-get_status"
if status_tool_name not in registered:
@mcp.tool(name=status_tool_name)
async def _sync_status(ctx: MCPContext, run_id: str) -> Dict[str, Any]:
return await _workflow_status(
ctx, run_id=run_id, workflow_name=workflow_name
)
registered.add(status_tool_name)
# Also register a per-run status tool: <tool-name>-get_status
status_tool_name = f"{name}-get_status"
if status_tool_name not in registered:
def _make_sync_status(_wf_name: str):
async def _sync_status(ctx: MCPContext, run_id: str) -> Dict[str, Any]:
return await _workflow_status(
ctx, run_id=run_id, workflow_name=_wf_name
)
return _sync_status
mcp.add_tool(_make_sync_status(workflow_name), name=status_tool_name)
registered.add(status_tool_name)
🤖 Prompt for AI Agents
In src/mcp_agent/server/app_server.py around lines 619-629, the status tool
currently closes over workflow_name late and is created via the @mcp.tool
decorator exposing extra params; replace it with a small factory that takes
workflow_name and returns an async handler that only accepts (ctx, run_id), then
register that returned function with the MCP runtime using mcp.add_tool (or the
equivalent add_tool API) under status_tool_name and add the name to registered.
Concretely: implement a def _make_status_tool(frozen_workflow_name): async def
_sync_status(ctx, run_id): return await _workflow_status(ctx, run_id=run_id,
workflow_name=frozen_workflow_name) return _sync_status; then call
mcp.add_tool(name=status_tool_name, func=_make_status_tool(workflow_name)) and
registered.add(status_tool_name).

Comment on lines 119 to 123
# Patch load_api_key_credentials to return None
with patch(
"mcp_agent.cli.cloud.commands.configure.main.load_api_key_credentials",
return_value=None,
"mcp_agent.cli.cloud.commands.configure.main.load_api_key_credentials",
return_value=None,
):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Wrong module patched; patches don’t affect delete_app.

Patch the settings and load_api_key_credentials in the delete command module, not configure.

Apply this diff:

-    with patch("mcp_agent.cli.cloud.commands.configure.main.settings") as mock_settings:
+    with patch("mcp_agent.cli.cloud.commands.app.delete.main.settings") as mock_settings:
@@
-        with patch(
-            "mcp_agent.cli.cloud.commands.configure.main.load_api_key_credentials",
-            return_value=None,
-        ):
+        with patch(
+            "mcp_agent.cli.cloud.commands.app.delete.main.load_api_key_credentials",
+            return_value=None,
+        ):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Patch load_api_key_credentials to return None
with patch(
"mcp_agent.cli.cloud.commands.configure.main.load_api_key_credentials",
return_value=None,
"mcp_agent.cli.cloud.commands.configure.main.load_api_key_credentials",
return_value=None,
):
# Patch settings in the delete command module
with patch("mcp_agent.cli.cloud.commands.app.delete.main.settings") as mock_settings:
# Patch load_api_key_credentials to return None
with patch(
"mcp_agent.cli.cloud.commands.app.delete.main.load_api_key_credentials",
return_value=None,
):
# …rest of test…
🤖 Prompt for AI Agents
tests/cli/commands/test_app_delete.py around lines 119 to 123: the test is
patching load_api_key_credentials in the configure module which doesn't affect
the delete_app command; update the patch targets to the delete_app command
module and also patch its settings import used by the command. Replace the
current patch path
"mcp_agent.cli.cloud.commands.configure.main.load_api_key_credentials" with the
correct module path for delete_app (e.g.
"mcp_agent.cli.cloud.commands.delete_app.load_api_key_credentials") and
add/adjust a patch for the settings object in that same delete_app module so the
test controls both credentials return value and settings used by delete_app.

Comment on lines +151 to 153
"mcp_agent.cli.cloud.commands.configure.main.load_api_key_credentials",
return_value=None,
):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify patch target for credentials loader

Confirm status command imports the loader from configure.main; otherwise this patch won’t intercept calls.


🏁 Script executed:

#!/bin/bash
rg -n -C2 -g 'src/**.py' '\bload_api_key_credentials\b'

Length of output: 206


🏁 Script executed:

#!/bin/bash
rg -n -C2 '\bload_api_key_credentials\b'

Length of output: 10247


Use correct module path when patching load_api_key_credentials
In tests/cli/commands/test_app_status.py (line 151), you’re patching

"mcp_agent.cli.cloud.commands.configure.main.load_api_key_credentials"

but the app-status command imports it from
mcp_agent.cli.cloud.commands.app.status.main. Update the patch to

"mcp_agent.cli.cloud.commands.app.status.main.load_api_key_credentials"
🤖 Prompt for AI Agents
In tests/cli/commands/test_app_status.py around lines 151 to 153, the test
patches the wrong import path for load_api_key_credentials; update the patch
target to the module the app-status command actually imports from by replacing
"mcp_agent.cli.cloud.commands.configure.main.load_api_key_credentials" with
"mcp_agent.cli.cloud.commands.app.status.main.load_api_key_credentials" so the
mocked function is applied where the code under test resolves the symbol.

Comment on lines +151 to 153
"mcp_agent.cli.cloud.commands.configure.main.load_api_key_credentials",
return_value=None,
):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify patch target for credentials loader

You’re patching configure.main.load_api_key_credentials while exercising workflows. If workflows imports the loader from a different module, this patch won’t take effect.

Run to confirm actual import site:


🏁 Script executed:

#!/bin/bash
rg -n -C2 -g 'src/**.py' '\bload_api_key_credentials\b'

Length of output: 206


🏁 Script executed:

#!/bin/bash
rg -n -C2 'load_api_key_credentials' -t py

Length of output: 10247


Patch loader import in workflows tests
In tests/cli/commands/test_app_workflows.py (lines 151–153), you’re patching mcp_agent.cli.cloud.commands.configure.main.load_api_key_credentials, but the workflows command imports load_api_key_credentials from mcp_agent.cli.cloud.commands.app.workflows.main. Update the patch target to:

with patch(
    "mcp_agent.cli.cloud.commands.app.workflows.main.load_api_key_credentials",
    return_value=None,
):
🤖 Prompt for AI Agents
In tests/cli/commands/test_app_workflows.py around lines 151 to 153, the patch
target is incorrect: it currently patches
"mcp_agent.cli.cloud.commands.configure.main.load_api_key_credentials" but the
workflows command actually imports load_api_key_credentials from
"mcp_agent.cli.cloud.commands.app.workflows.main"; update the patch call to
target
"mcp_agent.cli.cloud.commands.app.workflows.main.load_api_key_credentials"
(keeping return_value=None) so the test correctly patches the function used by
the workflows module.

Comment on lines +434 to +439
ascii_text = re.sub(r"[^A-z0-9 .,-]+", " ", combined_output)
# remove any remnants of colour codes
without_escape_codes = re.sub(r'\[\d+m', ' ', ascii_text)
without_escape_codes = re.sub(r"\[\d+m", " ", ascii_text)
# normalize spaces and convert to lower case
clean_text = ' '.join(without_escape_codes.split()).lower()
assert (
"does not exist" in clean_text
or "no such file" in clean_text
)
clean_text = " ".join(without_escape_codes.split()).lower()
assert "does not exist" in clean_text or "no such file" in clean_text
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use a robust, consistent output normalizer (fix A-z class, handle ANSI escapes).

Current regex uses A-z (includes [, , ], ^, _, `) and misses common ANSI sequences like [1;31m, which can leak noise and cause flaky assertions. Centralize normalization and fix the character class.

Apply this diff within this block:

-        ascii_text = re.sub(r"[^A-z0-9 .,-]+", " ", combined_output)
-        # remove any remnants of colour codes
-        without_escape_codes = re.sub(r"\[\d+m", " ", ascii_text)
-        # normalize spaces and convert to lower case
-        clean_text = " ".join(without_escape_codes.split()).lower()
+        # Normalize output (strip ANSI escapes, remove non-ASCII noise, collapse spaces)
+        clean_text = normalize_output(combined_output)

Add this helper near the top of the file (after imports):

ANSI_ESCAPE_RE = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]")

def normalize_output(text: str) -> str:
    # Remove ANSI escapes
    text = ANSI_ESCAPE_RE.sub(" ", text)
    # Allow letters, digits, spaces, and basic punctuation
    text = re.sub(r"[^A-Za-z0-9 .,\-]+", " ", text)
    return " ".join(text.split()).lower()
🤖 Prompt for AI Agents
In tests/cli/commands/test_cli_secrets.py around lines 434 to 439, the current
ad-hoc normalization uses the invalid A-z range and a too-simple ANSI pattern;
add the ANSI_ESCAPE_RE and normalize_output helper near the top of the file
(after imports) as specified, update the test block to call
normalize_output(combined_output) instead of the three-step regex sequence, and
assert against the normalized string (e.g., assert "does not exist" in
normalized or "no such file" in normalized); ensure the helper uses the provided
ANSI escape regex and the corrected character class [A-Za-z0-9 .,\-]+.

Comment on lines +65 to 70
ascii_text = re.sub(r"[^A-z0-9.,-]+", "", result.stdout)
# remove any remnants of colour codes
without_escape_codes = re.sub(r'\[[0-9 ]+m', '', ascii_text)
without_escape_codes = re.sub(r"\[[0-9 ]+m", "", ascii_text)
# normalize spaces and convert to lower case
clean_text = ' '.join(without_escape_codes.split()).lower()
clean_text = " ".join(without_escape_codes.split()).lower()

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Fix regex ranges and ANSI escape handling.

  • A-z matches extra chars ([, , ], ^, _ , `). Use A-Za-z.
  • ANSI color codes require ESC. Use \x1b[...m.

Apply this diff:

-    ascii_text = re.sub(r"[^A-z0-9.,-]+", "", result.stdout)
+    ascii_text = re.sub(r"[^A-Za-z0-9.,-]+", "", result.stdout)
-    without_escape_codes = re.sub(r"\[[0-9 ]+m", "", ascii_text)
+    without_escape_codes = re.sub(r"\x1b\[[0-9;]*m", "", ascii_text)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ascii_text = re.sub(r"[^A-z0-9.,-]+", "", result.stdout)
# remove any remnants of colour codes
without_escape_codes = re.sub(r'\[[0-9 ]+m', '', ascii_text)
without_escape_codes = re.sub(r"\[[0-9 ]+m", "", ascii_text)
# normalize spaces and convert to lower case
clean_text = ' '.join(without_escape_codes.split()).lower()
clean_text = " ".join(without_escape_codes.split()).lower()
ascii_text = re.sub(r"[^A-Za-z0-9.,-]+", "", result.stdout)
# remove any remnants of colour codes
without_escape_codes = re.sub(r"\x1b\[[0-9;]*m", "", ascii_text)
# normalize spaces and convert to lower case
clean_text = " ".join(without_escape_codes.split()).lower()
🤖 Prompt for AI Agents
In tests/cli/commands/test_deploy_command.py around lines 65 to 70, the regex
ranges are too broad and the ANSI escape handling is incomplete: replace the
character class A-z with A-Za-z so only letters are matched (i.e. use
A-Za-z0-9.,-), and update the ANSI escape removal to match the escape character
plus CSI sequences (use the ESC sequence like \x1b followed by \[ and the
numeric/semicolon parameters ending with m, e.g. \x1b\[[0-9; ]+m) so color codes
are stripped correctly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant